In [ ]:
import pandas as pd
import mmlspark
from pyspark.sql.types import IntegerType, StringType, StructType, StructField
In [ ]:
dataFile = "BookReviewsFromAmazon10K.tsv"
textSchema = StructType([StructField("rating", IntegerType(), False),
StructField("text", StringType(), False)])
import os, urllib
if not os.path.isfile(dataFile):
urllib.request.urlretrieve("https://mmlspark.azureedge.net/datasets/"+dataFile, dataFile)
data = spark.createDataFrame(pd.read_csv(dataFile, sep="\t", header=None), textSchema)
data.limit(10).toPandas()
Modify the label column to predict a rating greater than 3.
In [ ]:
processedData = data.withColumn("label", data["rating"] > 3) \
.select(["text", "label"])
processedData.limit(5).toPandas()
Split the dataset into train, test and validation sets.
In [ ]:
train, test, validation = processedData.randomSplit([0.60, 0.20, 0.20])
Use Tokenizer and Word2Vec to generate the features.
In [ ]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import Tokenizer, Word2Vec
tokenizer = Tokenizer(inputCol="text", outputCol="words")
partitions = train.rdd.getNumPartitions()
word2vec = Word2Vec(maxIter=4, seed=42, inputCol="words", outputCol="features",
numPartitions=partitions)
textFeaturizer = Pipeline(stages = [tokenizer, word2vec]).fit(train)
Transform each of the train, test and validation datasets.
In [ ]:
ptrain = textFeaturizer.transform(train).select(["label", "features"])
ptest = textFeaturizer.transform(test).select(["label", "features"])
pvalidation = textFeaturizer.transform(validation).select(["label", "features"])
ptrain.limit(5).toPandas()
Generate several models with different parameters from the training data.
In [ ]:
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, GBTClassifier
from mmlspark.TrainClassifier import TrainClassifier
import itertools
lrHyperParams = [0.05, 0.2]
logisticRegressions = [LogisticRegression(regParam = hyperParam)
for hyperParam in lrHyperParams]
lrmodels = [TrainClassifier(model=lrm, labelCol="label").fit(ptrain)
for lrm in logisticRegressions]
rfHyperParams = itertools.product([5, 10], [3, 5])
randomForests = [RandomForestClassifier(numTrees=hyperParam[0], maxDepth=hyperParam[1])
for hyperParam in rfHyperParams]
rfmodels = [TrainClassifier(model=rfm, labelCol="label").fit(ptrain)
for rfm in randomForests]
rfHyperParams = itertools.product([8, 16], [3, 5])
gbtclassifiers = [GBTClassifier(maxBins=hyperParam[0], maxDepth=hyperParam[1])
for hyperParam in rfHyperParams]
gbtmodels = [TrainClassifier(model=gbt, labelCol="label").fit(ptrain)
for gbt in gbtclassifiers]
trainedModels = lrmodels + rfmodels + gbtmodels
Find the best model for the given test dataset.
In [ ]:
from mmlspark import FindBestModel
bestModel = FindBestModel(evaluationMetric="AUC", models=trainedModels).fit(ptest)
Get the accuracy from the validation dataset.
In [ ]:
from mmlspark.ComputeModelStatistics import ComputeModelStatistics
predictions = bestModel.transform(pvalidation)
metrics = ComputeModelStatistics().transform(predictions)
print("Best model's accuracy on validation set = "
+ "{0:.2f}%".format(metrics.first()["accuracy"] * 100))
print("Best model's AUC on validation set = "
+ "{0:.2f}%".format(metrics.first()["AUC"] * 100))